Skip to content

[Do Not Merge]: adding diffuser router for diffusion RL#544

Closed
sniper35 wants to merge 14 commits intoradixark:mainfrom
sniper35:diffuser-router
Closed

[Do Not Merge]: adding diffuser router for diffusion RL#544
sniper35 wants to merge 14 commits intoradixark:mainfrom
sniper35:diffuser-router

Conversation

@sniper35
Copy link

@sniper35 sniper35 commented Feb 2, 2026

WIP, to close #541

  • Develop the miles-diffusion-router script/module.

  • Implement basic load balancing logic (east-Request).

  • Create a demo script showing the router coordinating multiple sglang-diffusion backends. Will run the test and provide test result.

  • Document the setup process and API usage.(examples/diffusion_router/)

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @sniper35, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces the "Miles Diffusion Router," a new component designed to efficiently distribute requests across multiple sglang-diffusion worker instances. It employs a least-request load balancing algorithm, ensuring that incoming requests are routed to the least busy worker. The router also features a robust health check system that identifies and quarantines unhealthy workers, dynamically adjusting the routing pool to maintain service reliability. This enhancement significantly improves the scalability and fault tolerance of the diffusion model serving infrastructure.

Highlights

  • New Diffusion Router Module: Introduces a robust load-balancing solution for sglang-diffusion worker instances, enhancing scalability and reliability.
  • Least-Request Load Balancing: Implements an intelligent routing strategy that directs incoming requests to the worker with the fewest active requests, optimizing resource distribution and preventing hot-spotting.
  • Dynamic Worker Management and Health Checks: Supports dynamic addition of workers via an API endpoint and includes a background health check mechanism to identify and quarantine unhealthy workers, ensuring high availability.
  • Comprehensive API and Documentation: Provides a set of API endpoints for managing workers, checking health, and proxying generation requests, complemented by detailed README.md documentation for easy setup and usage.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new diffusion router for load-balancing requests across multiple worker instances. The implementation includes least-request routing, background health checks, and worker quarantine capabilities. The changes are well-structured, with a core router implementation, a demo script, and documentation.

My review focuses on a critical issue in the health check logic that prevents dead workers from being resurrected, which could impact high availability. I've also provided suggestions to improve code structure by reducing duplication and improving encapsulation, simplifying some logic for better readability, enhancing API consistency, and clarifying the documentation.

| `POST` | `/add_worker` | Register a diffusion worker (`?url=...` or JSON body) |
| `GET` | `/list_workers` | List registered workers |
| `POST` | `/update_weights_from_disk` | Broadcast weight reload to all workers |
| `*` | `/{path}` | Catch-all proxy to least-loaded worker |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In the API Endpoints table, the method for the catch-all proxy is listed as *. This is not a standard HTTP method. For clarity, you should specify ANY or list the actual methods supported by the implementation, which are GET, POST, PUT, DELETE.

Suggested change
| `*` | `/{path}` | Catch-all proxy to least-loaded worker |
| `GET, POST, PUT, DELETE` | `/{path}` | Catch-all proxy to least-loaded worker |

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed as Gemini suggested.

--port Port (default: 30080)
--worker-urls Initial worker URLs
--max-connections Max concurrent connections (default: 100)
--timeout Request timeout in seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The description for the --timeout option is "Request timeout in seconds", which is ambiguous. It's not clear if this applies to client-to-router connections or router-to-worker connections. Based on the code, it's for router-to-worker requests. Please clarify this in the documentation for better user understanding.

Suggested change
--timeout Request timeout in seconds
--timeout Request timeout in seconds for router-to-worker requests

try:
data = json.loads(content)
return JSONResponse(content=data, status_code=response.status_code, headers=resp_headers)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a broad Exception can hide bugs or unexpected errors. It's better to catch the specific json.JSONDecodeError that you expect to happen when the response content is not valid JSON. This makes the code's intent clearer and prevents unintended error suppression.

        except json.JSONDecodeError:

Comment on lines +232 to +240
worker_url = request.query_params.get("url") or request.query_params.get("worker_url")

if not worker_url:
body = await request.body()
try:
payload = json.loads(body) if body else {}
except json.JSONDecodeError:
return JSONResponse(status_code=400, content={"error": "Invalid JSON body"})
worker_url = payload.get("url") or payload.get("worker_url")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The add_worker endpoint accepts both url and worker_url as parameter names, in both the query string and the JSON body. This can be confusing for API users. To improve clarity and consistency, it's best to stick to a single name. Since the documentation already uses url, I suggest removing support for worker_url.

        worker_url = request.query_params.get("url")

        if not worker_url:
            body = await request.body()
            try:
                payload = json.loads(body) if body else {}
            except json.JSONDecodeError:
                return JSONResponse(status_code=400, content={"error": "Invalid JSON body"})
            worker_url = payload.get("url")

Comment on lines +264 to +281
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Miles Diffusion Router")
parser.add_argument("--host", type=str, default="0.0.0.0")
parser.add_argument("--port", type=int, default=30080)
parser.add_argument("--worker-urls", nargs="*", default=[], help="Initial worker URLs to register")
parser.add_argument("--max-connections", type=int, default=100)
parser.add_argument("--timeout", type=float, default=None)
parser.add_argument("--health-check-interval", type=int, default=10)
parser.add_argument("--health-check-failure-threshold", type=int, default=3)
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()

router = DiffusionRouter(args, verbose=args.verbose)
for url in args.worker_urls:
router.worker_request_counts[url] = 0
router.worker_failure_counts[url] = 0

uvicorn.run(router.app, host=args.host, port=args.port, log_level="info")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The if __name__ == "__main__" block contains logic for argument parsing and pre-registering workers that is nearly identical to the logic in examples/diffusion_router/demo.py. This code duplication makes maintenance harder.

Furthermore, this block (and the demo script) directly manipulates the router's internal state (worker_request_counts, worker_failure_counts), which breaks encapsulation.

Consider removing this __main__ block to establish diffusion_router.py as a pure library module and rely on demo.py as the single executable example. This would improve code structure and maintainability.

| `POST` | `/add_worker` | Register a diffusion worker (`?url=...` or JSON body) |
| `GET` | `/list_workers` | List registered workers |
| `POST` | `/update_weights_from_disk` | Broadcast weight reload to all workers |
| `*` | `/{path}` | Catch-all proxy to least-loaded worker |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed as Gemini suggested.


The router uses a **least-request** strategy: each incoming request is forwarded to the worker with the fewest in-flight requests. This is workload-aware and avoids hot-spotting compared to round-robin. When a request completes, the worker's count is decremented, keeping the load state accurate in real time.

Workers that fail consecutive health checks (default: 3) are quarantined and excluded from the routing pool. A background loop pings each worker's `GET /health` endpoint at a configurable interval (default: 10s).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I pinged with Ratish @Ratish1 here:

#260 (comment)

Please do not add failed workers back easily.

@zhaochenyang20
Copy link
Collaborator

Thanks for the help @sniper35. Could you ping me on WeChat if you have my contact? Also, this PR will be held for a while under confidential experiment. We will acknowledge your PR in our blog. Thanks!

@zhaochenyang20
Copy link
Collaborator

Also, please clean up your code and document.

@zhaochenyang20
Copy link
Collaborator

Hey, could you try to use this to profile the scalability of your router?

python/sglang/multimodal_gen/benchmarks/bench_serving.py

in sglang main repo.

The idea is to benchmark the scalability and accuracy of your router. Say, using routing to launch 1/2/4/8 diffusion servers, provides 1000 requests, and set concurrency to 1/10/100, etc.

@sniper35
Copy link
Author

sniper35 commented Feb 6, 2026

Hey, could you try to use this to profile the scalability of your router?

python/sglang/multimodal_gen/benchmarks/bench_serving.py

in sglang main repo.

The idea is to benchmark the scalability and accuracy of your router. Say, using routing to launch 1/2/4/8 diffusion servers, provides 1000 requests, and set concurrency to 1/10/100, etc.

Sure will get the test result asap, thanks for your immediate response!

@sniper35
Copy link
Author

sniper35 commented Feb 7, 2026

@zhaochenyang20 When using SD3 with small (max_concurrency / num_workers) and small num_prompts in which case the hardware is underutilized, performance with router is actually worse than without router, when I increased the max_concurrency(24) / num_workers(8) to 3 and increased the num_prompts to 1920 and it starts to perform better than without router by a little. I need more compute resource to do larger experiment but hopefully it says sth about the advantage brought by the very basic router. I'll get more benchmark data and update the table.

Model: stabilityai/stable-diffusion-3-medium-diffusers
Server spec: Lambda 8xH100(80GB)
Screenshot 2026-02-07 at 1 04 10 AM

aggregation.csv

@zhaochenyang20
Copy link
Collaborator

@zhaochenyang20 When using SD3 with small (max_concurrency / num_workers) and small num_prompts in which case the hardware is underutilized, performance with router is actually worse than without router, when I increased the max_concurrency(24) / num_workers(8) to 3 and increased the num_prompts to 1920 and it starts to perform better than without router by a little. I need more compute resource to do larger experiment but hopefully it says sth about the advantage brought by the very basic router. I'll get more benchmark data and update the table.

Model: stabilityai/stable-diffusion-3-medium-diffusers Server spec: Lambda 8xH100(80GB) Screenshot 2026-02-07 at 1 04 10 AM

aggregation.csv

Nicely done. Thanks for your contribution. I do not think that there would be a case where using a router could be worse. My assumption is that we keep the num of servers fixed, like always, to 8. Then the router is just adding routing algorithms.

In other words, could you try to compare round-robin vs minimal-workload routing? I think the latter one would always be better.

In the same time, I am reviewing this PR:

sgl-project/sglang#18078

After this is merged, please rebase your codes and implement the update_weights_from_disk API to the router. Note that, this API should be broadcast to all.

@sniper35
Copy link
Author

sniper35 commented Feb 8, 2026

@zhaochenyang20

Screenshot 2026-02-07 at 10 35 42 PM

I found the issue in the test script and refactored it and produced the comparison files by running least-workload against round-robin and random selection.
routing_algorithm_comparison.csv
bench_least-request.json
bench_random.json
bench_round-robin.json

@sniper35 sniper35 marked this pull request as ready for review February 9, 2026 04:37
@sniper35 sniper35 requested a review from fzyzcjy as a code owner February 9, 2026 04:37
@zhaochenyang20
Copy link
Collaborator

@zhaochenyang20

Screenshot 2026-02-07 at 10 35 42 PM I found the issue in the test script and refactored it and produced the comparison files by running least-workload against round-robin and random selection. [routing_algorithm_comparison.csv](https://github.com/user-attachments/files/25159428/routing_algorithm_comparison.csv) [bench_least-request.json](https://github.com/user-attachments/files/25159430/bench_least-request.json) [bench_random.json](https://github.com/user-attachments/files/25159431/bench_random.json) [bench_round-robin.json](https://github.com/user-attachments/files/25159432/bench_round-robin.json)

This seems great and reasonable. We will merge this:

sgl-project/sglang#18154

After merging this, would you like to use the new profiling tools of the server to evaluate your systems? Or, you have already done it. That would be great!

For this PR, we will hold it for a while, and in the long run, we will invite you to a private fork. We can merge this PR into that fork. thanks!

@sniper35
Copy link
Author

@zhaochenyang20
Screenshot 2026-02-07 at 10 35 42 PM
I found the issue in the test script and refactored it and produced the comparison files by running least-workload against round-robin and random selection. routing_algorithm_comparison.csv bench_least-request.json bench_random.json bench_round-robin.json

This seems great and reasonable. We will merge this:

sgl-project/sglang#18154

After merging this, would you like to use the new profiling tools of the server to evaluate your systems? Or, you have already done it. That would be great!

For this PR, we will hold it for a while, and in the long run, we will invite you to a private fork. We can merge this PR into that fork. thanks!

Sure, sounds good. I'll wait for [sgl-project/sglang#18154](sgl-project/sglang#18154 to be merged and then evaluate using the new profiling tools. Thanks for your quick response!

@zhaochenyang20
Copy link
Collaborator

https://github.com/zhaochenyang20/sglang-diffusion-routing/invitations

Hey @sniper35 @klhhhhh

Since this PR has been blocked by the actual diffusion RL pipeline, we want to detach it from Miles right now and move it to an independent testing repo for quick development. Would you mind if we acknowledge your codes and move it to https://github.com/zhaochenyang20/sglang-diffusion-routing? It would be much convenient to make python binding to it 😂 thanks so much

By the way, if you would like to connect with me in person, let's discuss some details. Thanks! You can reach out to me on WeChat at 18015766633.

@zhaochenyang20 zhaochenyang20 changed the title WIP: diffuser router [Do Not Merge]: adding diffuser router for diffusion Rl Feb 13, 2026
@zhaochenyang20 zhaochenyang20 changed the title [Do Not Merge]: adding diffuser router for diffusion Rl [Do Not Merge]: adding diffuser router for diffusion RL Feb 13, 2026
@sniper35
Copy link
Author

sniper35 commented Feb 17, 2026

https://github.com/zhaochenyang20/sglang-diffusion-routing/invitations

Hey @sniper35 @klhhhhh

Since this PR has been blocked by the actual diffusion RL pipeline, we want to detach it from Miles right now and move it to an independent testing repo for quick development. Would you mind if we acknowledge your codes and move it to https://github.com/zhaochenyang20/sglang-diffusion-routing? It would be much convenient to make python binding to it 😂 thanks so much

By the way, if you would like to connect with me in person, let's discuss some details. Thanks! You can reach out to me on WeChat at 18015766633.

@zhaochenyang20 Seems @klhhhhh has already made a branch https://github.com/zhaochenyang20/sglang-diffusion-routing/tree/kl_dev for it, is this what you are looking for? Thanks @klhhhhh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Implement "Miles Diffusion Router" for Workload-Aware Rollouts

2 participants